b8f563199abc7cc500778b9edb740c155b4bc484,watchdog/src/main/java/com/continuuity/metrics/query/BatchMetricsHandler.java,BatchMetricsHandler,computeQueueLength,#MetricsRequest#,227

Before Change


    // First scan the ack to get an aggregate and also names of queues.
    AggregatesScanner scanner = aggregatesTable.scan(metricsRequest.getContextPrefix(),
                                                     "q.ack",
                                                     metricsRequest.getRunId(),
                                                     metricsRequest.getTagPrefix());
    long ack = 0;
    Set<QueueName> queueNames = Sets.newHashSet();
    while (scanner.hasNext()) {
      AggregatesScanResult scanResult = scanner.next();
      ack += scanResult.getValue();
      queueNames.add(QueueName.from(URI.create(scanResult.getMetric().substring("q.ack.".length()))));
    }

After Change


    // and subtract the two to get queue length.
    AggregatesScanner scanner = aggregatesTable.scan(metricsRequest.getContextPrefix(),
                                                     "process.events.processed",
                                                     metricsRequest.getRunId(),
                                                     "input");

    long processed = 0;
    Set<QueueName> queueNames = Sets.newHashSet();
    while (scanner.hasNext()) {
      AggregatesScanResult scanResult = scanner.next();
      processed += scanResult.getValue();
      // tag is of the form input.[queueURI].  ex: input.queue://PurchaseFlow/reader/queue
      String tag = scanResult.getTag();